// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/ProfileEvents.h>
#include <Common/TiFlashMetrics.h>
#include <Interpreters/Context.h>
#include <Poco/File.h>
#include <RaftStoreProxyFFI/ColumnFamily.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/Transaction/PartitionStreams.h>
#include <Storages/Transaction/ProxyFFI.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/SSTReader.h>
#include <Storages/Transaction/TMTContext.h>
#include <common/logger_useful.h>

namespace ProfileEvents
{
extern const Event DMWriteBytes;
}

namespace DB
{
namespace ErrorCodes
{
extern const int ILLFORMAT_RAFT_ROW;
} // namespace ErrorCodes

namespace DM
{
SSTFilesToDTFilesOutputStream::SSTFilesToDTFilesOutputStream( //
    BoundedSSTFilesToBlockInputStreamPtr child_,
    StorageDeltaMergePtr storage_,
    DecodingStorageSchemaSnapshotConstPtr schema_snap_,
    TiDB::SnapshotApplyMethod method_,
    FileConvertJobType job_type_,
    TMTContext & tmt_)
    : child(std::move(child_))
    , //
    storage(std::move(storage_))
    , schema_snap(std::move(schema_snap_))
    , method(method_)
    , job_type(job_type_)
    , tmt(tmt_)
    , log(&Poco::Logger::get("SSTFilesToDTFilesOutputStream"))
{
}

SSTFilesToDTFilesOutputStream::~SSTFilesToDTFilesOutputStream() = default;

void SSTFilesToDTFilesOutputStream::writePrefix()
{
    child->readPrefix();

    commit_rows = 0;
    watch.start();
}

void SSTFilesToDTFilesOutputStream::writeSuffix()
{
    child->readSuffix();

    if (dt_stream != nullptr)
    {
        dt_stream->writeSuffix();
        auto dt_file = dt_stream->getFile();
        assert(!dt_file->canGC()); // The DTFile should not be able to gc until it is ingested.
        // Add the DTFile to StoragePathPool so that we can restore it later
        const auto bytes_written = dt_file->getBytesOnDisk();
        storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written);

        // Report DMWriteBytes for calculating write amplification
        ProfileEvents::increment(ProfileEvents::DMWriteBytes, bytes_written);

        dt_stream.reset();
    }

    const auto process_keys = child->getProcessKeys();
    if (job_type == FileConvertJobType::ApplySnapshot)
    {
        GET_METRIC(tiflash_raft_command_duration_seconds, type_apply_snapshot_predecode_sst2dt).Observe(watch.elapsedSeconds());
        // Note that number of keys in different cf will be aggregated into one metrics
        GET_METRIC(tiflash_raft_process_keys, type_apply_snapshot).Increment(process_keys.total());
    }
    else
    {
        // Note that number of keys in different cf will be aggregated into one metrics
        GET_METRIC(tiflash_raft_process_keys, type_ingest_sst).Increment(process_keys.total());
    }
    LOG_FMT_INFO(
        log,
        "Pre-handle snapshot {} to {} DTFiles, cost {}ms [rows={}] [write_cf_keys={}] [default_cf_keys={}] [lock_cf_keys={}]",
        child->getRegion()->toString(true),
        ingest_files.size(),
        watch.elapsedMilliseconds(),
        commit_rows,
        process_keys.write_cf,
        process_keys.default_cf,
        process_keys.lock_cf);
}

bool SSTFilesToDTFilesOutputStream::newDTFileStream()
{
    // Generate a DMFilePtr and its DMFileBlockOutputStream
    DMFileBlockOutputStream::Flags flags;
    switch (method)
    {
    case TiDB::SnapshotApplyMethod::DTFile_Directory:
        flags.setSingleFile(false);
        break;
    case TiDB::SnapshotApplyMethod::DTFile_Single:
        flags.setSingleFile(true);
        break;
    default:
        break;
    }

    // The parent_path and file_id are generated by the storage.
    auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile();
    if (parent_path.empty())
    {
        // Can no allocate path and id for storing DTFiles (the storage may be dropped / shutdown)
        return false;
    }

    auto dt_file = DMFile::create(file_id, parent_path, flags.isSingleFile(), storage->createChecksumConfig(flags.isSingleFile()));
    LOG_FMT_INFO(
        log,
        "Create file for snapshot data {} [file={}] [single_file_mode={}]",
        child->getRegion()->toString(true),
        dt_file->path(),
        flags.isSingleFile());
    dt_stream = std::make_unique<DMFileBlockOutputStream>(tmt.getContext(), dt_file, *(schema_snap->column_defines), flags);
    dt_stream->writePrefix();
    ingest_files.emplace_back(dt_file);
    return true;
}

void SSTFilesToDTFilesOutputStream::write()
{
    size_t last_effective_num_rows = 0;
    size_t last_not_clean_rows = 0;
    size_t cur_effective_num_rows = 0;
    size_t cur_not_clean_rows = 0;
    while (true)
    {
        Block block = child->read();
        if (!block)
            break;
        if (unlikely(block.rows() == 0))
            continue;

        if (dt_stream == nullptr)
        {
            // If can not create DTFile stream (the storage may be dropped / shutdown),
            // break the writing loop.
            if (bool ok = newDTFileStream(); !ok)
            {
                break;
            }
        }

        {
            // Check whether rows are sorted by handle & version in ascending order.
            SortDescription sort;
            sort.emplace_back(MutableSupport::tidb_pk_column_name, 1, 0);
            sort.emplace_back(MutableSupport::version_column_name, 1, 0);
            if (unlikely(block.rows() > 1 && !isAlreadySorted(block, sort)))
            {
                const String error_msg
                    = fmt::format("The block decoded from SSTFile is not sorted by primary key and version {}", child->getRegion()->toString(true));
                LOG_ERROR(log, error_msg);
                FieldVisitorToString visitor;
                const size_t nrows = block.rows();
                for (size_t i = 0; i < nrows; ++i)
                {
                    const auto & pk_col = block.getByName(MutableSupport::tidb_pk_column_name);
                    const auto & ver_col = block.getByName(MutableSupport::version_column_name);
                    LOG_FMT_ERROR(
                        log,
                        "[Row={}/{}] [pk={}] [ver={}]",
                        i,
                        nrows,
                        applyVisitor(visitor, (*pk_col.column)[i]),
                        applyVisitor(visitor, (*ver_col.column)[i]));
                }
                throw Exception(error_msg);
            }
        }

        // Write block to the output stream
        DMFileBlockOutputStream::BlockProperty property;
        std::tie(cur_effective_num_rows, cur_not_clean_rows, property.gc_hint_version) //
            = child->getMvccStatistics();
        property.effective_num_rows = cur_effective_num_rows - last_effective_num_rows;
        property.not_clean_rows = cur_not_clean_rows - last_not_clean_rows;
        dt_stream->write(block, property);

        commit_rows += block.rows();
        last_effective_num_rows = cur_effective_num_rows;
        last_not_clean_rows = cur_not_clean_rows;
    }
}

PageIds SSTFilesToDTFilesOutputStream::ingestIds() const
{
    PageIds ids;
    for (const auto & file : ingest_files)
    {
        ids.emplace_back(file->fileId());
    }
    return ids;
}

void SSTFilesToDTFilesOutputStream::cancel()
{
    // Try a lightweight cleanup the file generated by this stream (marking them able to be GC-ed).
    for (auto & file : ingest_files)
    {
        try
        {
            file->enableGC();
        }
        catch (...)
        {
            tryLogCurrentException(log, fmt::format("ignore exception while canceling SST files to DeltaTree files stream [file={}]", file->path()));
        }
    }
}

} // namespace DM
} // namespace DB
